package indi.mozping.test;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @author by mozping
 * @Classname MyKafkaProducer
 * @Description TODO
 * @Date 2019/7/20 9:38
 */
public class MyKafkaProducer {

    private static Logger LOG = LoggerFactory.getLogger(MyKafkaProducer.class);

    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//        properties.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
//        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        return properties;
    }

    public static final String BROKER_LIST = "kfk1-stg.myhll.cn:9092,kfk2-stg.myhll.cn:9092,kfk3-stg.myhll.cn:9092";
    public static final String TOPIC = "CORE_ORDER_EVENT_ALL";
    public static final String CLIENT_ID = "producer-mzp-1";
    public static final Random random = new Random();
    public static final int CONCURRENT = 60; //每秒并发
    public static final int TIME = 300; //S
    public static final long BASE = 12347000000000L;


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        send();
        //sendAndForget();
        //sendSync();
        //sendAync();
    }

    private static void send() throws ExecutionException, InterruptedException {
        String msg = "{\n" +
                "    \"action\": \"order_cancel\",\n" +
                "    \"action_type\": \"demoData\",\n" +
                "    \"create_time\": 1,\n" +
                "    \"from\": \"demoData\",\n" +
                "    \"data\": {\n" +
                "        \"orderId\": 459964,\n" +
                "        \"userId\": 1,\n" +
                "        \"distance\": 1,\n" +
                "        \"orderDateTime\": \"demoData\",\n" +
                "        \"epId\": 1,\n" +
                "        \"totalPriceFen\": 1,\n" +
                "        \"payPriceFen\": 1,\n" +
                "        \"basePriceFen\": 1,\n" +
                "        \"orderStatus\": 1,\n" +
                "        \"payType\": 1,\n" +
                "        \"orderUuid\": \"demoData\",\n" +
                "        \"orderCreateTime\": \"demoData\",\n" +
                "        \"createTime\": \"demoData\",\n" +
                "        \"orderType\": \"demoData\",\n" +
                "        \"clientType\": \"demoData\",\n" +
                "        \"cityId\": \"demoData\",\n" +
                "        \"vehicleAttr\": 1,\n" +
                "        \"orderVehicleId\": \"demoData\",\n" +
                "        \"latLong\": \"demoData\",\n" +
                "        \"totalDistance\": 1,\n" +
                "        \"businessType\": 1,\n" +
                "        \"hitOnePrice\": 1,\n" +
                "        \"actionCreateTime\": 1,\n" +
                "        \"oldStatus\": 1,\n" +
                "        \"orderFleetStatus\": 1,\n" +
                "        \"isCleanAssign\": true\n" +
                "    }\n" +
                "}";
        JSONObject jsonObject = JSON.parseObject(msg);
        Properties prop = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, jsonObject.toJSONString());
        Future<RecordMetadata> send = producer.send(record);
        RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

    private static void sendAndForget() throws InterruptedException {
        Properties prop = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        OrderEventMessage orderEventMessage = new OrderEventMessage();
        orderEventMessage.setAction("prepay_ok");
        orderEventMessage.setActionType("3013");
        orderEventMessage.setCreateTime(1609816100535L);
        orderEventMessage.setFrom("ucore::pre_pay_notify_order");
        OrderMessage message = new OrderMessage();
        message.setOrderId(Math.abs(random.nextLong()));
        message.setUserId(100003420L);
        message.setDistance(0);
        message.setOrderDateTime("");
        message.setEpId(0L);
        message.setTotalPriceFen(0);
        message.setPayPriceFen(0);
        message.setBasePriceFen(500);
        message.setOrderStatus(0);
        message.setPayType(4);
        message.setOrderUuid("6666666");
        message.setOrderCreateTime("");
        message.setCreateTime("2021-01-05 11:08:19");
        message.setOrderType("1");
        message.setClientType("2");
        message.setCityId("1001");
        message.setVehicleAttr(0);
        message.setOrderVehicleId("4");
        message.setLatLong("22.57755646162|114.0582118285,22.606299720857|114.03407966349");
        message.setTotalDistance(5744);
        message.setBusinessType(1);
        message.setHitOnePrice(0);
        message.setActionCreateTime(0L);
        message.setOrderFleetStatus(0);
        message.setIsCleanAssign(false);
        orderEventMessage.setData(message);

        for (int i = 0; i < TIME; i++) {
            long begin = System.currentTimeMillis();
            for (int j = 0; j < CONCURRENT; j++) {
                OrderEventMessage msg = new OrderEventMessage();
                BeanUtils.copyProperties(orderEventMessage, msg);
                int anInt = random.nextInt(10000 * 10000);
                msg.getData().setOrderId(BASE + Math.abs(anInt));
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, JSONObject.toJSONString(msg));
                producer.send(record);
            }
            producer.flush();
            if (i % 10 == 0) {
                LOG.info("发送:" + orderEventMessage.getData().getOrderId());
            }
            long consume = System.currentTimeMillis() - begin;
            if (consume < 900) {
                Thread.sleep(1000 - (consume) - 20);
            }
        }
        LOG.info("End ... ");
    }

    private static void sendSync() {
        Properties prop = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        //ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello world , Kafka !" + new Date());
        RecordHeaders headers = new RecordHeaders();
        RecordHeader recordHeader = new RecordHeader("headKey", "headValue".getBytes());
        headers.add(recordHeader);
        ProducerRecord<String, String> record =
                new ProducerRecord<>(TOPIC, 0, System.currentTimeMillis(), null, "Hello world , Kafka !", headers);
        try {
            RecordMetadata recordMetadata = producer.send(record).get();
            System.out.println("发送完毕..." + recordMetadata);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        producer.flush();
        System.out.println("End ... ");
    }

    private static void sendSyncWithFuture() {
        Properties prop = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        //ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello world , Kafka !" + new Date());
        RecordHeaders headers = new RecordHeaders();
        RecordHeader recordHeader = new RecordHeader("headKey", "headValue".getBytes());
        headers.add(recordHeader);
        ProducerRecord<String, String> record =
                new ProducerRecord<>(TOPIC, 0, System.currentTimeMillis(), null, "Hello world , Kafka !", headers);
        try {
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata recordMetadata = future.get();
            System.out.println("发送完毕..." + recordMetadata);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        producer.flush();
        System.out.println("End ... ");
    }

    private static void sendAync() {
        Properties prop = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        //ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello world , Kafka !" + new Date());
        RecordHeaders headers = new RecordHeaders();
        RecordHeader recordHeader = new RecordHeader("headKey", "headValue".getBytes());
        headers.add(recordHeader);
        ProducerRecord<String, String> record =
                new ProducerRecord<>(TOPIC, 0, System.currentTimeMillis(), null, "Hello world , Kafka !", headers);
        try {
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("发送消息异常...");
                        exception.printStackTrace();
                    }
                    System.out.println("发送成功后回调，" + metadata);
                }
            });
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        producer.flush();
        System.out.println("End ... ");
    }


    //ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello world , Kafka !" + new Date());
//        RecordHeaders headers = new RecordHeaders();
//        RecordHeader recordHeader = new RecordHeader("headKey", "headValue".getBytes());
//        headers.add(recordHeader);
//        ProducerRecord<String, String> record =
//                new ProducerRecord<>(TOPIC, 0, System.currentTimeMillis(), null, "Hello world , Kafka !", headers);
}